import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * canal client api 的使用
 * https://github.com/alibaba/canal/wiki/ClientExample
 * 测试过程中发现，如果修改一个sql语句，但是修改的值没有发生变化，则此处不会监控到。
 * 同一个客户端启动多次，只有一个客户端可以获取到数据
 *
 * @author huan.fu 2021/5/31 - 上午10:31
 */
public class CanalClientApi {
    public static void main(String[] args) {

        String destination = "customer";
        // 创建一个 canal 链接
        // CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), destination, "admin", "admin");
        // 创建一个集群连接
        CanalConnector canalConnector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
        // 链接对应的canal server
        canalConnector.connect();
        // 订阅那个库的那个表等
        /**
         * 订阅规则
         * 1.  所有表：.*   or  .*\\..*
         * 2.  canal schema下所有表： canal\\..*
         * 3.  canal下的以canal打头的表：canal\\.canal.*
         * 4.  canal schema下的一张表：canal\\.test1
         * 5.  多个规则组合使用：canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
         */
        canalConnector.subscribe("temp_work\\.customer");
        // 回滚到未进行 #ack 的地方，下次fetch的时候，可以从最后一个没有 #ack 的地方开始拿
        canalConnector.rollback();
        int batchSize = 1000;
        while (true) {
            // 获取一批数据，不一定会获取到 batchSize 条
            Message message = canalConnector.getWithoutAck(batchSize);
            // 获取批次id
            long batchId = message.getId();
            // 获取数据
            List<CanalEntry.Entry> entries = message.getEntries();
            if (batchId == -1 || entries.isEmpty()) {
                System.out.println("没有获取到数据");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            for (CanalEntry.Entry entry : entries) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }

                CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("解析binlog数据出现异常 , data:" + entry.toString(), e);
                }

                CanalEntry.EventType eventType = rowChange.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));

                if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
                    System.out.println("sql => " + rowChange.getSql());
                }

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
            canalConnector.ack(batchId);
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}
